/*
 *  Filename:lsv_log_gc_gc_heap.c
 *  Description:
 *
 *  Created on: 2017年3月27日
 *  Author: Asdf(825674301)
 */

#include "lsv.h"
#include "lsv_conf.h"
#include "lsv_help.h"
#include "lsv_log.h"
#include "lsv_volume.h"
#include "lsv_gc_internal.h"

/**
 * @brief bottom up，对应插入元素, 或者，元素的属性发生变化，需要重新调整堆结构
 *
 * @param gc_heap
 * @param pos
 */
static void lsv_gc_heap_bottom_up(lsv_gc_heap_t *gc_heap, lsv_u32_t pos) {
        lsv_gc_logctrl_t **heap;
        lsv_gc_logctrl_t *logctrl;
        lsv_u32_t parent;

        heap = gc_heap->heap;
        logctrl = heap[pos];
        while (pos > 0) {
                parent = (pos - 1) >> 1;
                if (heap[parent]->gc_value >= logctrl->gc_value) {
                        break;
                }
                heap[pos] = heap[parent];
                pos = parent;
        }
        heap[pos] = logctrl;
}

static int __heap_insert(lsv_gc_heap_t *gc_heap, lsv_gc_logctrl_t *logctrl) {
        YASSERT(logctrl->head.page_count<=LSV_LOG_PAGE_NUM);

        gc_heap->heap[gc_heap->count] = logctrl;
        lsv_gc_heap_bottom_up(gc_heap, gc_heap->count);
        gc_heap->count++;

        return 0;
}

/**
 * @brief top down，对应移入堆顶元素
 *
 * @param gc_heap
 * @param pos
 */
static inline void __lsv_gc_heap_top_down(lsv_gc_heap_t *gc_heap, lsv_u32_t pos) {
        lsv_gc_logctrl_t **heap;
        lsv_gc_logctrl_t *logctrl;
        lsv_u32_t child, right, half;

        heap = gc_heap->heap;
        half = gc_heap->count >> 1;
        logctrl = heap[pos];
        while (pos < half) {
                child = (pos << 1) + 1;
                right = child + 1;
                if (right < LSV_GC_HEAP_SIZE && heap[right]->gc_value > heap[child]->gc_value) {
                        child = right;
                }
                if (logctrl->gc_value >= heap[child]->gc_value) {
                        break;
                }
                heap[pos] = heap[child];
                pos = child;
        }
        heap[pos] = logctrl;
}

static inline void __heap_pop(lsv_gc_heap_t *gc_heap) {
        gc_heap->heap[0] = gc_heap->heap[--gc_heap->count];
        __lsv_gc_heap_top_down(gc_heap, 0);
}

void lsv_gc_heap_valueup(lsv_gc_heap_t *gc_heap, lsv_gc_logctrl_t *logctrl) {
        if (logctrl->gc_value >= 0xFF) {
                return;
        }

        logctrl->gc_value++;

        uint32_t pos;
        uint32_t chunk_id = logctrl->chunk_id;

        // complexity: O(n)
        for (pos = 0; pos < gc_heap->count; pos++) {
                if (gc_heap->heap[pos]->chunk_id == chunk_id) {
                        break;
                }
        }

        if (pos < gc_heap->count) {
                DINFO("change a log in gc_heap. chunk_id:%u,gc_value:%u\n", chunk_id, logctrl->gc_value);
                YASSERT(gc_heap->heap[pos]->chunk_id == chunk_id);
                lsv_gc_heap_bottom_up(gc_heap, pos);
        }
}

int lsv_gc_heap_insert(lsv_volume_proto_t *lsv_info, lsv_gc_logctrl_t* logctrl) {
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_heap_t *gc_heap = NULL;
        lsv_gc_old_storage_t *old_storage = NULL;
        lsv_gc_logctrl_t *storage_logctrl;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;
        gc_heap = &gc_context->gc_heap;
        old_storage = &gc_context->old_storage;

        lsv_list_head_t storage, *pos, *next;
        INIT_LIST_HEAD(&storage);

        CHUNK_HISTORY("insert into heap", logctrl);

        // 插入元素
        __heap_insert(gc_heap, logctrl);

        //check heap
        if (LSV_GC_HEAP_SIZE == gc_heap->count) {
                // heap满，移除一半到old storage
                gc_heap->count = LSV_GC_HEAP_SIZE >> 1;   // 511
                for (uint32_t i = gc_heap->count; i < LSV_GC_HEAP_SIZE; i++) {
                        DINFO("to old storage, chunk_id:%u\n", gc_heap->heap[i]->chunk_id);
                        list_add_tail(&gc_heap->heap[i]->hook, &storage);
                        gc_heap->heap[i] = NULL;
                }

                list_for_each_safe(pos, next, &storage) {
                        storage_logctrl = (lsv_gc_logctrl_t*) pos;
                        list_del_init(pos);

                        lsv_gc_old_storage_remove_and_add(lsv_info, storage_logctrl->chunk_id, 0);
                }

                lsv_lock(&old_storage->lock);
                lsv_gc_old_storage_head_set(lsv_info);
                lsv_unlock(&old_storage->lock);

                lsv_gc_fullgc_check(lsv_info);
        }

        return 0;
}

static inline int __lsv_gc_wait_queue_offer(lsv_gc_wait_queue_t *wait_queue, lsv_gc_logctrl_t* logctrl) {
        CHUNK_HISTORY("add to gc wait queue", logctrl);

        list_add_tail(&logctrl->hook, &wait_queue->queue);
        wait_queue->count++;
        wait_queue->value += LSV_LOG_PAGE_NUM - logctrl->lp_bm.inactive_page_count;
        return 0;
}

static inline int __lsv_gc_wait_queue_poll(lsv_gc_wait_queue_t *wait_queue, lsv_gc_logctrl_t** logctrl) {
        *logctrl = (lsv_gc_logctrl_t *) wait_queue->queue.next;
        list_del_init(wait_queue->queue.next);
        wait_queue->value -= LSV_LOG_PAGE_NUM - (*logctrl)->lp_bm.inactive_page_count;
        wait_queue->count--;

        CHUNK_HISTORY("poll gc wait queue", (*logctrl));
        return 0;
}

static inline int __lsv_gc_wait_queue_polllast(lsv_gc_wait_queue_t *wait_queue,
                lsv_gc_logctrl_t** logctrl) {
        *logctrl = (lsv_gc_logctrl_t *) wait_queue->queue.prev;
        list_del_init(wait_queue->queue.prev);
        wait_queue->count--;
        wait_queue->value -= LSV_LOG_PAGE_NUM - (*logctrl)->lp_bm.inactive_page_count;
        return 0;
}

/**
 * @brief 从heap中，移除logctrl到wait queue，对可整体回收的chunk，整体回收之
 *
 * @param lsv_info
 * @param type
 * @param wait_queue
 */
static void __lsv_gc_heap_pop(lsv_volume_proto_t* lsv_info, lsv_u8_t type, lsv_gc_wait_queue_t* wait_queue) {
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_heap_t *gc_heap = NULL;
        lsv_gc_logctrl_t* logctrl = NULL;
        struct list_head free_queue, *pos, *next;
        float logctrl_check;

        // CHECK_CONCURRENCY_BEGIN();

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;
        gc_heap = &gc_context->gc_heap;

        INIT_LIST_HEAD(&free_queue);

        //get log that can gc, to wait_queue
        while (gc_heap->count > 0 && wait_queue->value < LSV_LOG_PAGE_NUM) {
                // 取最大堆的第一项
                logctrl = gc_heap->heap[0];

                logctrl_check = logctrl->head.page_count * LSV_GC_DIRTY_RATIO;
                YASSERT(logctrl->head.page_count<=LSV_LOG_PAGE_NUM);

                DINFO("pop from wait queue, chunk_id %u gc_value %u gc_count %u inactive_page_count %u page_count %u check %.2f\n",
                      logctrl->chunk_id,
                      logctrl->gc_value,
                      logctrl->gc_count,
                      logctrl->lp_bm.inactive_page_count,
                      logctrl->head.page_count,
                      logctrl_check);

                // 1. 有可回收页
                // 2. 强制回收，或可回收页大于指定页数
                if (lsv_gc_logctrl_is_inactive(logctrl)) {
                        // fast gc
                        __heap_pop(gc_heap);

                        CHUNK_HISTORY("add to free queue", logctrl);
                        list_add_tail(&logctrl->hook, &free_queue);
                } else if ((logctrl->lp_bm.inactive_page_count > 0) &&
                        // thorough gc
                           ((type & LSV_GC_TYPE_BIT_FORCE) || (logctrl->lp_bm.inactive_page_count >= logctrl_check))) {
                        __heap_pop(gc_heap);

                        DINFO_NOP("add to wait queue, chunk_id %u wait_queue.value %u wait_queue.count %u\n",
                                  logctrl->chunk_id,
                                  wait_queue->value,
                                  wait_queue->count);
                        __lsv_gc_wait_queue_offer(wait_queue, logctrl);
                } else if (logctrl->gc_value >= logctrl_check) {
                        __heap_pop(gc_heap);

                        DINFO("return to check queue, chunk_id %u\n", logctrl->chunk_id);
                        _lsv_gc_check_queue_offer(&gc_context->check_queue, logctrl);
                } else {
                        // 没有可回收的logctrl，heap不变
                        DINFO("no log to gc, chunk_id:%u\n", logctrl->chunk_id);
                        break;
                }
        }

        // CHECK_CONCURRENCY_END();

        // @sa NOVA: fast gc
        list_for_each_safe(pos, next, &free_queue) {
                logctrl = (lsv_gc_logctrl_t*) pos;
                list_del_init(pos);

                lsv_gc_log_free(lsv_info, logctrl->chunk_id);
        }
}

static int __gc_merge_write_new_chunk(lsv_volume_proto_t *lsv_info, lsv_bitmap_update_t *gc_bitmap_update,
                                      lsv_log_proto_t *gc_log_proto, lsv_gc_logctrl_t **new_logctrl) {
        int rc;
        lsv_gc_logctrl_t *logctrl;

        lsv_log_check(gc_log_proto->chunk_id, gc_log_proto->log);

        *new_logctrl = NULL;

        //create gc_logctrl
        rc = lsv_gc_logctrl_create(lsv_info, 1, *gc_log_proto, &logctrl);
        if (rc < 0) {
                DERROR("create logctrl err,errno:%d\n", rc);
                goto ERR0;
        }

        // gc write
        lsv_volume_io_t vio;
        lsv_volume_io_init(&vio, gc_log_proto->chunk_id, 0, LSV_CHUNK_SIZE, LSV_LOG_LOG_STORAGE_TYPE);

        // update head crc
        gc_log_proto->head->crc = page_crc32(gc_log_proto->log);

        rc = lsv_volume_chunk_update(lsv_info, &vio, gc_log_proto->log);
        if (rc < 0) {
                YASSERT(0);
        }

        // gc bitmap
        rc = lsv_bitmap_start_session(lsv_info);
        if (unlikely(rc)) {
                YASSERT(0);
        }

        for (int i = 0; i < gc_log_proto->head->page_count; i++) {
                DINFO("refmap write %u/%u lba %llu crc %u chunk_id %u page %u new_chunk_id %u new_page %u snap_id %u\n",
                      i,
                      gc_log_proto->head->page_count,
                      (LLU)gc_bitmap_update[i].lba,
                      gc_log_proto->hlog[i].crc,
                      gc_bitmap_update[i].old_chunk_id,
                      gc_bitmap_update[i].old_chunk_page_off - 1,
                      gc_bitmap_update[i].new_chunk_id,
                      gc_bitmap_update[i].new_chunk_page_off - 1,
                      gc_bitmap_update[i].snap_id);

                rc = lsv_bitmap_lba_set(lsv_info, gc_bitmap_update + i);
                if (rc < 0) {
                        YASSERT(0);
                } else if (rc == 1) {
                        // 此返回值重要，不该返回1的返回1，会造成误回收.
                        // TODO merge的过程中，某些页可能会变脏, 可以跳过gc check
                        // 后来执行gc check的时候，lp_bm有值
                        logctrl->gc_value++;
                        lp_bitmap_set_inactive(&logctrl->lp_bm, i);
                        DINFO("gc a dirty page, chunk_id %u bm %u lba:%llu \n",
                              logctrl->chunk_id,
                              logctrl->lp_bm.inactive_page_count,
                              (LLU )gc_bitmap_update[i].lba);
                }
        }

        rc = lsv_bitmap_commit_dirty(lsv_info);
        if (unlikely(rc)) {
                YASSERT(0);
        }

        *new_logctrl = logctrl;
        return 0;
ERR0:
        return rc;
}

static int __lsv_gc_heap_do_merge(lsv_volume_proto_t *lsv_info, const lsv_gc_wait_queue_t* wait_queue,
                uint32_t chunk_id, lsv_gc_logctrl_t **new_logctrl) {
        int rc = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_logctrl_t *logctrl = NULL, *logctrl_check = NULL;
        struct list_head *pos;
        uint32_t page_idx;
        lsv_bitmap_update_t *gc_bitmap_update;
        lsv_log_proto_t log_proto, gc_log_proto;

        *new_logctrl = NULL;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;
        log_proto.log = NULL;

        // @malloc ERR1
        gc_bitmap_update = malloc(sizeof(lsv_bitmap_update_t) * LSV_LOG_PAGE_NUM);
        if (NULL == gc_bitmap_update) {
                rc = -ENOMEM;
                goto ERR0;
        }

        // @malloc ERR2
        log_proto.log = malloc(LSV_CHUNK_SIZE);
        if (NULL == log_proto.log) {
                rc = -ENOMEM;
                DERROR("malloc log_proto.log failed,errno:%d\n", rc);
                goto ERR1;
        }

        // @malloc ERR3
        gc_log_proto.log = malloc(LSV_CHUNK_SIZE);
        if (NULL == gc_log_proto.log) {
                rc = -ENOMEM;
                DERROR("malloc gc_log_proto.log failed,errno:%d\n", rc);
                GOTO(ERR2, rc);
        }

        lsv_log_proto_link(&gc_log_proto);

        gc_log_proto.chunk_id = chunk_id;

        gc_log_proto.head->timestamp = gettime();
        gc_log_proto.head->page_count = 0;
        gc_log_proto.head->crc = 0;

        list_for_each(pos, &wait_queue->queue) {
                logctrl = (lsv_gc_logctrl_t*) pos;

                DINFO("gc_wait_gc_queue each: chunk_id %u\n", logctrl->chunk_id);
                logctrl_check = (lsv_gc_logctrl_t*) hash_table_find(gc_context->logctrl_tab, &logctrl->chunk_id);
                YASSERT(NULL!=logctrl_check);

                // TODO loop
                rc = lsv_volume_chunk_read(lsv_info, logctrl->chunk_id, log_proto.log);
                if (rc < 0) {
                        DERROR("read err err,errno:%d\n", rc);
                        YASSERT(0);
                }
                lsv_log_proto_link(&log_proto);

                YASSERT(log_proto.head->page_count<=LSV_LOG_PAGE_NUM);

                CHUNK_HISTORY("gc merge :old", logctrl);

                lsv_log_check(logctrl->chunk_id, log_proto.log);

                for (uint32_t i = 0; i < log_proto.head->page_count && gc_log_proto.head->page_count < LSV_LOG_PAGE_NUM; i++) {
                        // TODO copy alive page to new log, mark these pages dead
                        // alive page
                        if (!lp_bitmap_get(&logctrl->lp_bm, i)) {
                                // YASSERT(log_proto.hlog[i].size==LSV_PAGE_SIZE);
                                page_idx = gc_log_proto.head->page_count;

                                gc_log_proto.hlog[page_idx] = log_proto.hlog[i];
                                gc_log_proto.slog[page_idx] = log_proto.slog[i];

                                gc_bitmap_update[page_idx].lba = log_proto.hlog[i].lba;
                                gc_bitmap_update[page_idx].old_chunk_id = logctrl->chunk_id;
                                gc_bitmap_update[page_idx].old_chunk_page_off = lsv_log_slogpos2chkoffset(i);
                                gc_bitmap_update[page_idx].new_chunk_id = gc_log_proto.chunk_id;
                                gc_bitmap_update[page_idx].new_chunk_page_off = lsv_log_slogpos2chkoffset(page_idx);

                                // TODO 保留snap_id不变，表示该页元数据记录在对应快照上
                                gc_bitmap_update[page_idx].snap_id = log_proto.hlog[i].snap_id;

                                // make dead
                                lp_bitmap_set_inactive(&logctrl->lp_bm, i);
#ifdef __PAGE_CRC32__
                                uint32_t crc1 = page_crc32(log_proto.slog[i].page);
                                uint32_t crc2 = page_crc32(gc_log_proto.slog[page_idx].page);
                                YASSERT(crc1 == crc2);
#else
				uint32_t crc1 = 0;
				uint32_t crc2 = 0;
#endif
                                DINFO("refmap write lba %llu chunk_id %u page %u -> chunk_id %u page %u crc %u %u\n",
                                      (LLU)log_proto.hlog[i].lba,
                                      logctrl->chunk_id,
                                      i,
                                      gc_log_proto.chunk_id,
                                      page_idx,
                                      crc1,
                                      crc2);

                                gc_log_proto.head->page_count++;
                        }
                }

                if (gc_log_proto.head->page_count == LSV_LOG_PAGE_NUM) {
                        break;
                }

                YASSERT(gc_log_proto.head->page_count < LSV_LOG_PAGE_NUM);
        }

        // @malloc ERR4
        rc = __gc_merge_write_new_chunk(lsv_info, gc_bitmap_update, &gc_log_proto, &logctrl);
        if (unlikely(rc)) {
                GOTO(ERR3, 0);
        }

        if (gc_log_proto.log) {
                free(gc_log_proto.log);
                gc_log_proto.log = NULL;
        }

        if (log_proto.log) {
                free(log_proto.log);
                log_proto.log = NULL;
        }

        if (gc_bitmap_update) {
                yfree((void **)&gc_bitmap_update);
        }

        *new_logctrl = logctrl;
        return rc;
ERR3:
        if (gc_log_proto.log) {
                free(gc_log_proto.log);
                gc_log_proto.log = NULL;
        }
ERR2:
        if (log_proto.log) {
                free(log_proto.log);
                log_proto.log = NULL;
        }
ERR1:
        if(gc_bitmap_update) {
                yfree((void **)&gc_bitmap_update);
        }
ERR0:
        return rc;
}

static void __lsv_gc_heap_free(lsv_volume_proto_t *lsv_info, lsv_gc_wait_queue_t* wait_queue,
                lsv_gc_logctrl_t *gc_logctrl) {

        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_logctrl_t* logctrl = NULL, *logctrl_check = NULL;

        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        gc_logctrl->gc_count = LSV_GC_NUMTIMES + 1;
        while (wait_queue->count > 0) {
                __lsv_gc_wait_queue_poll(wait_queue, &logctrl);

                logctrl_check = (lsv_gc_logctrl_t*) hash_table_find(gc_context->logctrl_tab, &logctrl->chunk_id);
                YASSERT(NULL!=logctrl_check);

                if (logctrl->gc_count < gc_logctrl->gc_count) {
                        gc_logctrl->gc_count = logctrl->gc_count;
                }

                CHUNK_HISTORY("gc free", logctrl);
                if (lsv_gc_logctrl_is_inactive(logctrl)) {
                        lsv_gc_log_free(lsv_info, logctrl->chunk_id);
                } else {
                        YASSERT(logctrl->head.page_count > 0 &&
                                logctrl->head.page_count > logctrl->lp_bm.inactive_page_count);

                        logctrl->gc_count++;
                        logctrl->gc_value = logctrl->lp_bm.inactive_page_count;
                        lsv_gc_heap_insert(lsv_info, logctrl);
                }
        }

#ifdef LSV_GC_STATISTICS
        {
                lsv_gc_statistics_t *stat_info;
                stat_info = &((lsv_log_info_t*) lsv_info->log_info)->gc_context->stat_info;
                stat_info->storage_pages += gc_logctrl->head.page_count;
                // DINFO_LOGGC_STATISTICS(stat_info);
        }
#endif
}

/**
 * @brief gc merge
 *
 * @param lsv_info
 * @param type
 * @param wait_queue
 * @param logctrl_
 * @return
 */
static int __lsv_gc_heap_merge(lsv_volume_proto_t *lsv_info, lsv_u8_t type,
                                   lsv_gc_wait_queue_t* wait_queue,
                                   lsv_gc_logctrl_t** new_logctrl) {
        int rc = 0;
        lsv_log_info_t *log_info = NULL;
        lsv_gc_context_t *gc_context = NULL;
        lsv_gc_logctrl_t *gc_logctrl = NULL;
        uint32_t chunk_id;

        *new_logctrl = NULL;

        (void) type;
        log_info = lsv_info->log_info;
        gc_context = log_info->gc_context;

        lsv_info->share.gc_merge_enter_count++;
        DINFO_NOP("gc_merge_enter_count %u\n", lsv_info->share.gc_merge_enter_count);

#if LSV_GC_GC_CHUNK_TYPE==1
        //only gc a log
        while (wait_queue->value > LSV_LOG_PAGE_NUM) {
                __lsv_gc_wait_queue_polllast(wait_queue, &logctrl);
                _lsv_log_gc_heap_insert(lsv_info, logctrl);
                DLOG_GC_INFO("not gc tail log,chunk_id:%u\n", logctrl->chunk_id);
        }
#endif

        YASSERT(wait_queue->count > 1);

        // @malloc ERR1
        rc = lsv_volume_chunk_malloc(lsv_info, LSV_LOG_LOG_STORAGE_TYPE, &chunk_id);
        if (rc < 0) {
                DERROR("malloc volume chunk err,errno:%d\n", rc);
                GOTO(ERR0, rc);
        }

        DINFO("malloc a new log, chunk_id:%u\n", chunk_id);

        rc = __lsv_gc_heap_do_merge(lsv_info, wait_queue, chunk_id, &gc_logctrl);
        if (rc < 0) {
                DERROR("merge log failed,errno:%d\n", rc);
                GOTO(ERR1, rc);
        }

        __lsv_gc_heap_free(lsv_info, wait_queue, gc_logctrl);

        // finish
        gc_logctrl->gc_count++;

        CHUNK_HISTORY("gc merge :new", gc_logctrl);

        // 合成了新的logctrl
        rc = hash_table_insert(gc_context->logctrl_tab, gc_logctrl, &gc_logctrl->chunk_id, 0);
        if (unlikely(rc)) {
                YASSERT(0);
        }

        if (gc_logctrl->gc_count > LSV_GC_NUMTIMES) {
                lsv_gc_heap_insert(lsv_info, gc_logctrl);
        } else {
                _lsv_gc_check_queue_offer(&gc_context->check_queue, gc_logctrl);
        }

        *new_logctrl = gc_logctrl;

        gc_context->total_chunk_num++;
        lsv_info->share.gc_merge_enter_count--;
        return 0;
ERR1:
        lsv_volume_chunk_free(lsv_info, LSV_LOG_LOG_STORAGE_TYPE, chunk_id);
ERR0:
        lsv_info->share.gc_merge_enter_count--;
        return rc;
}

/**
 * @brief gc入口函数
 *
 * @todo 并发下，是否有问题？
 *
 * @param lsv_info
 * @param type
 * @param logctrl_  NULL
 * @return
 */
int lsv_gc_heap_do_gc(lsv_volume_proto_t *lsv_info, lsv_u8_t type, lsv_gc_logctrl_t** new_logctrl) {
        int ret = 0;
        lsv_gc_logctrl_t* logctrl = NULL;

        // 每个task，都有自己的本地副本
        lsv_gc_wait_queue_t wait_queue;

        // 此过程有yield，可中断，共享数据有什么？ CQ, heap etc，得到有效的保护了吗？

        INIT_LIST_HEAD(&wait_queue.queue);
        wait_queue.count = 0;
        wait_queue.value = 0;

        // get log that can gc, add to wait_queue
        __lsv_gc_heap_pop(lsv_info, type, &wait_queue);

        DINFO_NOP(":wait queue type %u value %u count %u\n",
                  type,
                  wait_queue.value,
                  wait_queue.count);

        *new_logctrl = NULL;

        if ((wait_queue.value >= LSV_LOG_PAGE_NUM) || ((type & LSV_GC_TYPE_BIT_FORCE) && (wait_queue.value > 0))) {
                ret = __lsv_gc_heap_merge(lsv_info, type, &wait_queue, new_logctrl);

        } else {
                // 不执行gc merge，放回heap
                while (wait_queue.count > 0) {
                        __lsv_gc_wait_queue_poll(&wait_queue, &logctrl);

                        CHUNK_HISTORY("not gc a log", logctrl);
                        lsv_gc_heap_insert(lsv_info, logctrl);
                }
        }

        YASSERT(wait_queue.count == 0);

        return ret;
}
